package com.parkingwang.learning.rx1;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;

public class ConnectableObservableDemo {

    public static void main(String[] args) {

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onNext("3");
                subscriber.onNext("4");
            }
        });

        //将普通被观察者转换为可连接观察者
        //connect()之前订阅的观察者才能接收事件，connect()之后订阅的观察者无法获取观察者发送的事件
        //replay()不管connect()前后都可以接收到数据
//        ConnectableObservable<String> connectableObservable = observable.publish();
        ConnectableObservable<String> connectableObservable = observable.replay();

        //可连接观察者转为普通观察者
//        Observable<String> observable1 = connectableObservable.refCount();


        connectableObservable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext1---" + s);
            }
        });

        //connect()让观察者主动发送数据,调用此方法才会发送数据
        connectableObservable.connect();

        //再次订阅
        connectableObservable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext2---" + s);
            }
        });


    }

}
